use crate::app::core::{ControlEvent, ControlEventDispenser, EventKind};
use crate::config::{EntryPoint, ServicesLister};
use anyhow::Result;
use futures::prelude::*;
use k8s_openapi::api::core::v1::Service;
use kube::core::params::ListParams;
use kube::runtime::watcher::Event;
use kube::{runtime::watcher, Api, Client};
use std::collections::BTreeMap;
use std::sync::Arc;

pub struct SvcController {
    client: Client,
    cfg: ServicesLister,
}
impl SvcController {
    pub fn new(client: Client, cfg: ServicesLister) -> Self {
        Self { client, cfg }
    }
}

impl SvcController {
    pub fn watch_service(self, dispenser: Arc<dyn ControlEventDispenser>) -> Result<()> {
        let cfg = self.cfg;
        if !cfg.enable {
            return Ok(());
        }
        let client = self.client;
        let api: Api<Service> = match cfg.namespaces {
            Some(ref s) => Api::namespaced(client, s),
            None => Api::all(client),
        };
        let params = ListParams::default().labels(&cfg.selector);
        let mut watcher = watcher(api, params).boxed();
        tokio::spawn(async move {
            wd_log::log_info_ln!(
                "开启service监听,namespaces=[{}] selector=[{}]",
                 &cfg.namespaces.as_ref().unwrap_or(&String::new()),
                &cfg.selector
            );
            #[allow(irrefutable_let_patterns)]
            while let res = watcher.try_next().await {
                let opt = match res {
                    Ok(o) => o,
                    Err(err) => {
                        wd_log::log_error_ln!("watch sevices error：{:?}", err);
                        continue;
                    }
                };
                let svc = match opt {
                    Some(s) => s,
                    None => {
                        wd_log::log_info_ln!("get service is null");
                        continue;
                    }
                };
                wd_log::log_info_ln!("接收到一个k8s event:{:?}", &svc);
                for i in Self::event_services_to_control_event(svc).into_iter() {
                    if let Err(err) = dispenser.dispatch(i).await {
                        wd_log::log_error_ln!(
                            "k8s cluster services event dispatch error:{}",
                            err.to_string()
                        );
                    }
                }
            }
        });
        Ok(())
    }

    fn event_services_to_control_event(e: Event<Service>) -> Vec<ControlEvent> {
        match e {
            Event::Applied(svc) => Self::service_to_ces(svc, true),
            Event::Deleted(svc) => Self::service_to_ces(svc, false),
            Event::Restarted(svc) => {
                let mut list = vec![];
                for i in svc.into_iter() {
                    list.append(&mut Self::service_to_ces(i, true));
                }
                list
            }
        }
    }
    fn service_to_ces(svc: Service, is_build: bool) -> Vec<ControlEvent> {
        let mut list = vec![];
        let name = if let Some(s) = svc.metadata.name {
            s
        } else {
            return list;
        };
        if svc.spec.is_none() || svc.spec.as_ref().unwrap().ports.is_none() {
            return list;
        }
        if !is_build {
            return vec![ControlEvent::default().set_kind(EventKind::RemoveEP(name))];
        }
        let mut map = svc.metadata.labels.unwrap_or(BTreeMap::new());
        for i in svc.spec.unwrap().ports.unwrap().into_iter() {
            // wd_log::log_debug_ln!("services port:{:?}",&i);
            let mut cfg = EntryPoint::default();
            cfg.name = format!("{}-{}", &name, i.name.as_ref().unwrap_or(&String::new()));
            cfg.addr = format!("0.0.0.0:{}", i.port);
            cfg.ca_file = map.remove(&format!("{}_ca", i.name.as_ref().unwrap_or(&String::new())));
            cfg.private_key = map.remove(&format!(
                "{}_private_key",
                i.name.as_ref().unwrap_or(&String::new())
            ));
            cfg.publish_key = map.remove(&format!(
                "{}_public_key",
                i.name.as_ref().unwrap_or(&String::new())
            ));
            // if let Some(s) = i.name {
            //     match &*(s.to_ascii_lowercase()) {
            //         "rust_ingress_grpc"=>cfg.ty = EPType::Grpc,
            //         "rust_ingress_tcp"=>cfg.ty = EPType::Tcp,
            //         "rust_ingress_websocket"=>cfg.ty = EPType::WebSocket,
            //         _ =>{}
            //     }
            // }
            let ep = ControlEvent::default().set_kind(EventKind::BuildEP(cfg));
            list.push(ep);
        }
        return list;
    }
}
